<?php
declare (strict_types = 1);

namespace app\command;

use app\common\exception\ApiException;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

/**
 *     php think mq:consumer {scene_name}
 *     php think mq:consumer member_oplog
 */
class RabbitMQConsumerRouter extends Command
{
    protected function configure()
    {
        $this->setName('mq:consumer')
            ->addArgument('scene', Argument::OPTIONAL, "消费者应用场景，必须和 config/rabbitmq.php 里面的key一直")
            ->addOption('msg', null, Option::VALUE_OPTIONAL, "消息体必须 json 格式字符串")
            ->setDescription('命令行路由器');
    }

    /**
     * @throws \ErrorException
     */
    protected function execute(Input $input, Output $output)
    {
        set_time_limit(0);
        $scene = trim($input->getArgument('scene'));
        if (empty($scene)) {
            $output->writeln('修正数据场景必须');
            exit(0);
        }
        $msg = $input->getOption('msg');
        if ( ! empty($msg)) {
            $msg = json_decode($msg, true);
            if ( ! is_array($msg)) {
                $output->writeln('消息体必须是json字符串');
                exit(0);
            }
        }

        $conf = config('rabbitmq.' . $scene);
        if (empty($conf)) {
            $output->writeln('不支持的消费者场景，请在./config/rabbitmq.php 文件里面配置');
            exit(0);
        }
        //注入类处理自己的逻辑
        $loc = app($conf['class']);

        // https://stackoverflow.com/questions/64013120/php-amqplib-read-write-timeout-and-heartbeat-relation
        $connConf   = config('rabbitmq.hosts');
        $connection = new AMQPStreamConnection(
            $host = $connConf['host'],
            $port = $connConf['port'],
            $user = $connConf['username'],
            $password = $connConf['password'],
            $vhost = $connConf['vhost'],
            $insist = false,
            $login_method = 'AMQPLAIN',
            $login_response = null,
            $locale = 'en_US',
            $connection_timeout = 3.0,
            $read_write_timeout = 250.0,
            $context = null,
            $keepalive = false,
            $heartbeat = 120,
        );
        $channel = $connection->channel();
        /**
         * Declares exchange
         *
         * @param string $exchange
         * @param string $type
         * @param bool $passive
         * @param bool $durable
         * @param bool $auto_delete
         * @param bool $internal
         * @param bool $nowait
         * @return mixed|null
         * https://help.aliyun.com/zh/apsaramq-for-rabbitmq/developer-reference/error-codes-returned-when-amqp-methods-are-called
         * channel.exchangeDeclare("${exchangeName}", "只能取值x-delayed-message", true, false, arguments);
         * ExchangeTypeNotSupport[x-delayed-message]
         */
        $channel->exchange_declare(
            $conf['exchange_name'],
            $conf['exchange_type'] ?? 'direct',
            $passive = false,
            $durable = true,
            $auto_delete = false,
            $internal = false,
            $nowait = false,
            $arguments = $conf['exchange_args'] ?? [],
        );
        // var_dump($conf);exit;
        $channel->queue_declare(
            $conf['queue_name'],
            $passive = false,
            $durable = true,
            $exclusive = false, // 如果设置为 true，表示队列是排他的，即只能被当前连接使用，并且在连接断开后队列会被删除。
            $auto_delete = false, // 如果设置为 true，表示在最后一个消费者断开连接后，队列会自动删除。
            $nowait = false, //  如果设置为 true，表示不等待服务器的确认响应，立即返回。此模式下，任何错误将不会得到通知。
            $arguments = $conf['queue_args'] ?? [],
        );
        //将队列名与交换器名进行绑定，并指定routing_key
        $channel->queue_bind(
            $conf['queue_name'],
            $conf['exchange_name'],
            $conf['route_key']
        );
        // var_dump($conf);exit;
        $callback = function ($msg) use ($loc) {
            echo " [x] Received " . date('Y-m-d H:i:s'), $msg->body, "\n";
            if ($msg->body === 'quit') {
                $msg->delivery_info['channel']->basic_cancel(
                    $msg->delivery_info['consumer_tag']
                );
            } else {
                // 处理接收到的消息
                $data = is_string($msg->body) ? json_decode($msg->body, true) : [];
                try {
                    $res = $loc->run($data);
                    //手动确认ack，确保消息已经处理
                    if ($res) {
                        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                    } else {
                        echo $loc->getError();
                        echo "\n";
                    }
                } catch (\Throwable $e) {
                    // var_dump($e->getTrace());
                    // 添加自定义异常处理机制
                    if ($e instanceof ApiException) {
                    } else {
                        // 其他错误交给系统处理
                        throw $e;
                    }
                }
            }
        };
        //设置消费成功后才能继续进行下一个消费
        $channel->basic_qos(0, 1, false);
        $channel->basic_consume(
            $conf['queue_name'],
            '', false, false, false, false, $callback);

        $output->writeln(date('Y-m-d H:i:s') . ' 开始执行消费者 ' . $scene);
        while (count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

}
